package com.atguigu.flink.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/13
 *
 类型：  时间，计数
 特征：  滚动，滑动，会话
 计算并行度： 全局，keyed

 演示： 全局滚动计数窗口
       全局滑动计数窗口

        计数窗口没有会话窗口
 */
public class Demo1_CountWindowAll
{
    public static void main(String[] args) {
        
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(2);

        AllWindowedStream<WaterSensor, GlobalWindow> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            // 滚动:size = 3(窗口中最多存3个数据)， slide = 3(每3个触发一次窗口运行)  每3个算一次，一次算3个
            //.countWindowAll(3l);
           // 滑动: size = 3(窗口中最多存3个数据)， slide = 2(每2个触发一次窗口运行)。 每2个算一次，一次算3个。重复算
           // .countWindowAll(3l,2l);
            // 滑动: size = 3(窗口中最多存3个数据)， slide = 5(每5个触发一次窗口运行)。 每5个算一次，一次算3个。漏算
            .countWindowAll(3l,5l)
           ;

        ds
            /*
                    ProcessAllWindowFunction<IN, OUT, W extends Window>
                            IN: 上游发送的类型，窗口中的元素类型。固定的
                            OUT： 你要指定，输出的类型
                           Window W： 当前是计数窗口还是时间窗口

             */
            .process(new ProcessAllWindowFunction<WaterSensor, String, GlobalWindow>()
            {
                /*
                    Context context: 获取其他信息
                    Iterable<WaterSensor> elements： 窗口中存储的数据
                    Collector<String> out: 输出
                 */
                @Override
                public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    System.out.println("Demo1_CountWindowAll.process");
                    out.collect(MyUtil.parseToList(elements).toString());
                }
            })
            //.setParallelism(10)  //全局窗口计算，无法设置并行度超过1
            .print();

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
